package com.yzg.study.kafka.common.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;

@Component
@Slf4j
public class KafkaMsgUtils {

    @Resource
    KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * 创建kafka监听者
     */
    ProducerListener<String, Object> producerListener = new ProducerListener() {
        public void onSuccess(ProducerListener<String, Object> produceRecord, RecordMetadata recordMetadata) { log.info("Message Send Success:{}-{}-{}", produceRecord.toString());
        }
    };

    /**
     * 发送消息
     *
     * @param topic
     * @param data
     */
    public void send(String topic, Object data) {
        kafkaTemplate.setProducerListener(producerListener);
        kafkaTemplate.send(topic, data);
    }

    public Set<String> topicList() throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        AdminClient adminClient = KafkaAdminClient.create(props);
        ListTopicsResult topicList = adminClient.listTopics();
        return topicList.names().get();
    }

}
